适用于版本3.0.1。
新的消费者API 提供并行的、Kafka分区与Spark分区1:1、可以访问偏移和元数据的方式。与旧版本直接API在使用方式上不同。
1 连接
1 | groupId = org.apache.spark |
注意:不要直接引用kafka依赖。因为spark使用了过渡性依赖,影响了诊断方式的兼容性。
2 创建直接流
1 | import org.apache.kafka.clients.consumer.ConsumerRecord |
每个记录是一个ConsumerRecord对象。
更多配置详见Kafka consumer config docs
当批处理间隔大于Kafka默认的心跳会话超时(30s)时,需要增加heartbeat.interval.ms 和session.timeout.ms。
批处理间隔超过5min,还需要改变中介上的group.max.session.timeout.ms。
提交策略详见Storing Offsets。
3 位置策略
新API会预提取消息到缓冲中。为了提升性能,Spark会在执行器上缓存消费者(spark.streaming.kafka.consumer.cache.enabled),倾向于调度分区到相应的消费者位置。
PreferConsistent平均分配分区到执行器上,适合大多数场景;
PreferBrokers调度分区到leader所在的节点,适合执行器和broker处于相同节点;
PreferFixed允许显式执行分区-主机映射,其余平均分配,适合数据倾斜场景。
默认最大消费者缓存上限为64。如果需要处理超过64 * number of executors个分区,可以设置spark.streaming.kafka.consumer.cache.maxCapacity。
缓存通过主题分区和group.id识别,因此需要为每次调用createDirectStream,使用不同的group.id。
4 消费策略
ConsumerStrategies 允许Spark即使从检查点重启,也可以获取配置的消费者。
- Subscribe订阅固定的主题集合
- SubscribePattern使用正则指定主题,需要注意响应新加的主题分区
- Assign订阅固定的分区集合
以上方式都可以在构造器中指定偏移。
ConsumerStrategy可以自定义扩展。
5 创建RDD
1 | // Import dependencies and create kafka params as in Create Direct Stream above |
适用于离线处理消息,通过执行主题、分区和偏移。
注意:
不能使用PreferBrokers,因为流场景以外,没有驱动端的消费者自动查找元数据。如果需要,使用PreferFixed和自定义的元数据查找。
6 获取偏移
1 | stream.foreachRDD { rdd => |
注意:
- HasOffsetRanges类型转换仅在createDirectStream创建后的第一个方法调用中成功。
- RDD与Kafka分区一对一关系仅在shuffle或repartition前成立。
7 保存偏移
Kafka消息传递语义取决于偏移的存储方式和时机。Spark输出保证了至少一次语义。
想要实现刚好一次语义,必须实现以下一项:
- 在幂等输出后保存偏移
- 输出时在原子性事务中保存偏移
存储偏移主要有以下3种方式:
(1) 检查点
偏移将被保存在检查点中。
但是需要保证输出的幂等性,并且不能在代码变更后恢复检查点。
计划的代码更新可以同时运行新旧代码并保持输出幂等;而计划外的需要识别开始的偏移。
(2) Kafka
Kafka默认在消费者poll()成功后自动提交偏移。
可以使用异步提交commitAsync()在确认输出后提交偏移。
因为Kafka不是事务性,所以输出需要幂等。?
1 | stream.foreachRDD { rdd => |
注意:由于HasOffsetRanges的原因,CanCommitOffsets类型转换只对createDirectStream的结果有效。
(3) 自定义
实现支持事务的偏移存储。
尤其针对难以幂等的聚合输出。
1 | // The details depend on your data store, but the general idea looks like this |
8 SSL/TLS
新的Kafka消费者API支持SSL,仅用于Spark与Kafka之间的通信,内部节点间需要另外实现。
1 | val kafkaParams = Map[String, Object]( |
9 部署
将spark-streaming-kafka-0-10_2.12及其依赖打包
将spark-core_2.12和spark-streaming_2.12标记为provided
详见部署
10 安全
详见Structured Streaming Security
注意:Kafka原生的sink不可用,因此代理令牌只在消费端使用。
参考资料
Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)